Skip to content

Fix async streaming hangs on dead connections#212

Draft
anticomputer wants to merge 12 commits intomainfrom
anticomputer/async-hang-fixes
Draft

Fix async streaming hangs on dead connections#212
anticomputer wants to merge 12 commits intomainfrom
anticomputer/async-hang-fixes

Conversation

@anticomputer
Copy link
Copy Markdown
Contributor

Add httpx timeout to AsyncOpenAI client and wrap stream event iteration with an idle timeout to detect and recover from dead connections stuck in CLOSE_WAIT state.

Add httpx timeout to AsyncOpenAI client and wrap stream event
iteration with an idle timeout to detect and recover from dead
connections stuck in CLOSE_WAIT state.
Copilot AI review requested due to automatic review settings April 9, 2026 05:37
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds guardrails to prevent async streaming runs from hanging indefinitely on dead HTTP connections by applying client-side HTTP timeouts and an application-level idle timeout around stream event iteration.

Changes:

  • Add httpx.Timeout(...) to the AsyncOpenAI client used by TaskAgent.
  • Introduce STREAM_IDLE_TIMEOUT and wrap stream_events() iteration in asyncio.wait_for(...) to detect stalled streams and raise APITimeoutError.
  • Log a specific error when the stream is idle beyond the configured threshold.
Show a summary per file
File Description
src/seclab_taskflow_agent/runner.py Adds a streaming idle-timeout loop around result.stream_events() to fail fast and retry when events stop arriving.
src/seclab_taskflow_agent/agent.py Configures AsyncOpenAI with explicit httpx connect/read/write/pool timeouts to avoid hanging requests.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 2/2 changed files
  • Comments generated: 3

Comment on lines 395 to 412
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manual __aiter__/__anext__ loop no longer guarantees the stream iterator is closed when exiting early (e.g., on asyncio.TimeoutError). Unlike async for, this can leave the async generator / underlying streaming run unclosed, potentially leaking tasks/sockets and undermining the goal of recovering from dead connections. Ensure the stream is explicitly closed in a finally (e.g., via contextlib.aclosing(...) or calling aclose()/a provided result cleanup API) before raising/retrying.

Suggested change
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
stream = None
try:
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
await render_model_output(event.data.delta, async_task=async_task, task_id=task_id)
finally:
if stream is not None:
aclose = getattr(stream, "aclose", None)
if aclose is not None:
await aclose()

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — addressed in abb4ba3. Added finally block with aclose() to ensure the async generator is cleaned up on timeout/early exit.

Comment on lines +394 to +410
result = agent0.run_streamed(prompt, max_turns=max_turns)
async for event in result.stream_events():
stream = result.stream_events()
async_iter = stream.__aiter__()
while True:
try:
event = await asyncio.wait_for(
async_iter.__anext__(),
timeout=STREAM_IDLE_TIMEOUT,
)
except StopAsyncIteration:
break
except asyncio.TimeoutError:
logging.error(
f"Stream idle for {STREAM_IDLE_TIMEOUT}s — "
"connection likely dead, raising APITimeoutError"
)
raise APITimeoutError("Stream idle timeout exceeded")
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces a stream-idle timeout behavior but there are no unit tests covering the new retry path (timeout -> APITimeoutError -> retry) or verifying that iteration stops/cleans up correctly. Consider adding a focused test that stubs run_streamed().stream_events() to yield a couple events and then hang, asserting the idle timeout triggers and cleanup/retry behavior occurs as intended.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a targeted hotfix for a production hang we hit during a live experiment — the timeout/retry path was validated manually against Python 3.13 (normal iteration, StopAsyncIteration propagation, TimeoutError, and cancellation safety). Happy to add a proper test suite in a follow-up but didn't want to block the fix on that.

Comment on lines 182 to 187
client = AsyncOpenAI(
base_url=resolved_endpoint,
api_key=resolved_token,
default_headers=provider.extra_headers or None,
timeout=httpx.Timeout(connect=10.0, read=300.0, write=300.0, pool=60.0),
)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client read timeout is set to 300s while the runner’s STREAM_IDLE_TIMEOUT is 1800s. Since these are two independent timeouts affecting streaming behavior, the effective idle-kill behavior can become unclear to maintainers and may not match the intended 30-minute threshold. Consider centralizing/documenting the relationship between these timeouts (or aligning them) so the configured behavior is unambiguous.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are intentionally different — they guard different failure modes at different layers:

  • httpx.Timeout(read=300s) — TCP-level. Catches dead connections where the socket itself stops delivering bytes (CLOSE_WAIT). This is the first line of defense.
  • STREAM_IDLE_TIMEOUT(1800s) — Application-level. Catches hangs where the connection is technically alive but no events arrive (e.g. the async generator is stuck, or the server stops sending SSE frames while keeping the connection open).

The read timeout fires per individual socket read; the idle timeout fires when no complete event has been yielded for 30 minutes. In practice the httpx timeout catches most dead-connection cases and the idle timeout is a backstop for subtler hangs. I'll add a comment in the code clarifying the relationship.

Copilot AI review requested due to automatic review settings April 9, 2026 05:46
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

kevinbackhouse
kevinbackhouse previously approved these changes Apr 9, 2026
@anticomputer anticomputer marked this pull request as ready for review April 9, 2026 14:51
@anticomputer anticomputer marked this pull request as draft April 9, 2026 14:53
Copilot AI review requested due to automatic review settings April 9, 2026 16:39
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent async streaming runs from hanging indefinitely on dead or stalled connections by adding timeouts and ensuring network/session resources are cleaned up reliably.

Changes:

  • Add an application-level stream idle timeout around streamed event iteration.
  • Configure AsyncOpenAI with explicit httpx.Timeout values to detect dead sockets sooner.
  • Add explicit cleanup steps (close OpenAI client; cancel MCP session task) to avoid dangling sockets/tasks.
Show a summary per file
File Description
src/seclab_taskflow_agent/runner.py Adds stream idle timeout loop, closes the agent client in finally, and attempts to cancel MCP session tasks during cleanup.
src/seclab_taskflow_agent/agent.py Adds httpx client timeouts to AsyncOpenAI and introduces an async close() method to release the underlying connection pool.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 2/2 changed files
  • Comments generated: 3

Comment on lines 369 to 377
system_prompt = mcp_system_prompt(
primary_personality.personality,
primary_personality.task,
server_prompts=server_prompts,
important_guidelines=important_guidelines,
)
agent0 = None
agent0 = TaskAgent(
name=primary_name,
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agent0 is referenced in the outer finally block, but it’s only initialized to None here after multiple earlier operations in the try block (e.g., building handoff agents / system prompt). If an exception is raised before this line runs, the finally block will raise UnboundLocalError when checking agent0, preventing cleanup. Initialize agent0 to None at the very start of the outer try (or before it) and then assign the TaskAgent once created; also avoids the current redundant double-assignment.

Copilot uses AI. Check for mistakes.
if stream is not None:
aclose = getattr(stream, "aclose", None)
if aclose is not None:
await aclose()
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finally cleanup calls await aclose() directly. Any exception raised during aclose() will override the original streaming error/timeout and can break the retry path. Consider wrapping the aclose() call in a try/except (logging on failure) so cleanup failures don’t mask the root cause.

Suggested change
await aclose()
try:
await aclose()
except Exception:
logging.exception("Failed to close streamed response")

Copilot uses AI. Check for mistakes.
Comment on lines +483 to +485
await mcp_sessions
except (asyncio.CancelledError, Exception):
pass
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After mcp_sessions.cancel(), the code does await mcp_sessions with no timeout. This can hang indefinitely if the session task suppresses cancellation during cleanup (it catches CancelledError inside its cleanup loop). Use a bounded wait (e.g., asyncio.wait_for(..., timeout=MCP_CLEANUP_TIMEOUT)) and log if cancellation doesn’t complete, so shutdown can’t re-hang in the cleanup path.

Suggested change
await mcp_sessions
except (asyncio.CancelledError, Exception):
pass
await asyncio.wait_for(mcp_sessions, timeout=MCP_CLEANUP_TIMEOUT)
except asyncio.TimeoutError:
logging.warning(
"Timed out waiting for MCP session task cancellation after %s seconds",
MCP_CLEANUP_TIMEOUT,
)
except asyncio.CancelledError:
pass
except Exception:
logging.exception("Exception while waiting for mcp session task cancellation")

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings April 9, 2026 17:13
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent async streaming runs from hanging indefinitely on dead/stalled connections by adding explicit network timeouts and an application-level idle timeout, plus more aggressive cleanup of long-lived async tasks/clients.

Changes:

  • Add an application-level stream idle timeout around stream_events() iteration to detect “no events yielded” hangs and convert them into APITimeoutError retries.
  • Configure AsyncOpenAI with an explicit httpx.Timeout and add TaskAgent.close() to release the underlying httpx connection pool.
  • Add extra cleanup/cancellation for the MCP session task; CLI currently force-exits on success.
Show a summary per file
File Description
src/seclab_taskflow_agent/runner.py Adds stream idle timeout loop + closes the primary OpenAI client + cancels MCP session task.
src/seclab_taskflow_agent/agent.py Adds httpx client timeouts and introduces a close() method to dispose the AsyncOpenAI client.
src/seclab_taskflow_agent/cli.py Force-exits after successful asyncio.run() to avoid lingering spin/hangs.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 3/3 changed files
  • Comments generated: 2

Comment on lines +163 to +165
# Force-exit on success to prevent asyncio event loop spin on
# dangling connections/tasks that survive cleanup.
os._exit(0)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling os._exit(0) on success will bypass normal interpreter shutdown (no flushing stdio buffers, no finally/atexit handlers, no coverage/profiling hooks), which can truncate output and make the CLI unsafe to embed/invoke from tests or other Python code. Prefer fixing the underlying dangling-task issue (which this PR already addresses) and exiting normally (return from main / let asyncio.run finish, or raise typer.Exit(0) / sys.exit(0) if an explicit exit is needed).

Suggested change
# Force-exit on success to prevent asyncio event loop spin on
# dangling connections/tasks that survive cleanup.
os._exit(0)
return

Copilot uses AI. Check for mistakes.
Comment on lines 463 to +467
finally:
# Close the AsyncOpenAI client to release httpx connection pool.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
await agent0.close()
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This finally only closes the primary TaskAgent’s AsyncOpenAI client. Handoff agents are currently created via TaskAgent(...).agent, which keeps the underlying AsyncOpenAI client alive but makes it impossible to close() later, so their httpx connection pools can still leak and reintroduce the same CLOSE_WAIT/kqueue-spin behavior. Consider retaining the handoff TaskAgent wrappers (or their clients) and closing all of them here; also wrap agent0.close() in a try/except so a close failure doesn’t mask an earlier exception or prevent MCP cleanup.

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings April 9, 2026 18:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent async streaming runs from hanging indefinitely on dead/stalled connections by adding both transport-level and application-level timeouts, plus additional cleanup to avoid lingering tasks/sockets.

Changes:

  • Add an httpx.Timeout to the AsyncOpenAI client used by TaskAgent.
  • Add a 30-minute stream-idle backstop around stream_events() iteration (with explicit stream closing).
  • Add additional shutdown cleanup (closing the OpenAI client, cancelling MCP session tasks) and a forced process exit on successful completion.
Show a summary per file
File Description
src/seclab_taskflow_agent/runner.py Adds stream idle timeout handling, closes the agent client in finally, cancels MCP session task, and introduces a forced os._exit(0) on success.
src/seclab_taskflow_agent/agent.py Configures AsyncOpenAI with an explicit httpx.Timeout and adds a TaskAgent.close() to release the httpx connection pool.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 2/2 changed files
  • Comments generated: 2

Comment on lines +465 to +468
# Close the AsyncOpenAI client to release httpx connection pool.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
await agent0.close()
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the primary TaskAgent’s AsyncOpenAI client is being closed here. Any additional handoff agents created earlier via TaskAgent(...).agent will still retain their own AsyncOpenAI/httpx pools (the wrapper instance is discarded, but the Agent keeps a reference to the client through its model). To fully prevent lingering CLOSE_WAIT sockets / event-loop spin, track and close all TaskAgent-owned clients (or share a single client across agents) in this finally block.

Suggested change
# Close the AsyncOpenAI client to release httpx connection pool.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
await agent0.close()
# Close all AsyncOpenAI clients reachable from the task agent graph to
# release every httpx connection pool created for handoff agents too.
# Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
if agent0 is not None:
root_agent = getattr(agent0, "agent", None)
agents_to_visit: list[Agent[Any]] = []
if root_agent is not None:
agents_to_visit.append(root_agent)
seen_agent_ids: set[int] = set()
extra_clients: list[Any] = []
seen_client_ids: set[int] = set()
primary_client = getattr(getattr(root_agent, "model", None), "client", None)
if primary_client is not None:
seen_client_ids.add(id(primary_client))
while agents_to_visit:
current_agent = agents_to_visit.pop()
current_agent_id = id(current_agent)
if current_agent_id in seen_agent_ids:
continue
seen_agent_ids.add(current_agent_id)
current_client = getattr(getattr(current_agent, "model", None), "client", None)
if current_client is not None and id(current_client) not in seen_client_ids:
seen_client_ids.add(id(current_client))
extra_clients.append(current_client)
for handoff in getattr(current_agent, "handoffs", ()) or ():
handoff_agent = getattr(handoff, "agent", handoff)
if handoff_agent is not None:
agents_to_visit.append(handoff_agent)
await agent0.close()
for client in extra_clients:
close_client = getattr(client, "close", None)
if close_client is None:
continue
try:
await close_client()
except Exception:
logging.exception("Exception while closing handoff agent client")

Copilot uses AI. Check for mistakes.
Comment on lines +776 to +782
# Force-exit after successful completion only: asyncio.run() cleanup
# spins on dangling tasks/connections from the responses API path.
# Failure paths (must_complete break, personality mode) use normal exit.
if taskflow_path and session is not None and session.finished:
sys.stdout.flush()
sys.stderr.flush()
os._exit(0)
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using os._exit(0) inside run_main force-terminates the entire process and bypasses normal shutdown (pending finally blocks in other code, atexit handlers, log handler flush/close, coverage hooks, etc.). This makes run_main unsafe to call from any embedded/programmatic context and can cause silent data loss. Prefer fixing the underlying dangling task/connection issue (or gating a forced exit behind a CLI-only flag/environment variable, outside the exported API).

Suggested change
# Force-exit after successful completion only: asyncio.run() cleanup
# spins on dangling tasks/connections from the responses API path.
# Failure paths (must_complete break, personality mode) use normal exit.
if taskflow_path and session is not None and session.finished:
sys.stdout.flush()
sys.stderr.flush()
os._exit(0)
# Successful completion should return normally so embedded/programmatic
# callers retain control of process shutdown and normal cleanup runs.
if taskflow_path and session is not None and session.finished:
sys.stdout.flush()
sys.stderr.flush()
return

Copilot uses AI. Check for mistakes.
A daemon thread monitors last-activity timestamps and calls os._exit(2)
if no progress for WATCHDOG_IDLE_TIMEOUT (default 30 min, env override).

Pings are placed in the streaming loop, tool call hooks, and MCP cleanup.
This covers every hang variant: dead connections, asyncio cleanup spin,
MCP shutdown, etc. — the thread runs outside asyncio and cannot be blocked
by event loop issues.
Copilot AI review requested due to automatic review settings April 9, 2026 21:07
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent async streaming runs from hanging indefinitely on dead/stalled connections by adding layered timeouts and more aggressive cleanup/termination behavior around streaming and MCP session lifecycle management.

Changes:

  • Add an httpx.Timeout to the AsyncOpenAI client and expose a TaskAgent.close() to release the underlying httpx connection pool.
  • Wrap streaming event iteration with an application-level idle timeout to detect streams that stop yielding events.
  • Add a watchdog thread and additional cleanup/cancellation logic intended to prevent event-loop spins and stuck tasks.
Show a summary per file
File Description
src/seclab_taskflow_agent/runner.py Adds stream idle timeout, watchdog thread, extra cleanup/cancellation, and a forced process-exit path to avoid hangs/spins.
src/seclab_taskflow_agent/agent.py Configures AsyncOpenAI with explicit httpx timeouts and adds a close method to release the httpx pool.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comments suppressed due to low confidence (3)

src/seclab_taskflow_agent/runner.py:828

  • run_main now unconditionally evaluates session when computing the os._exit(...) status. In personality-only mode (-p without taskflow/resume), the session variable is never defined, so this will raise UnboundLocalError right before exiting. Initialize session: TaskflowSession | None = None near the top of run_main (outside the conditional), or guard this exit path when no session exists.
    # Force-exit to prevent asyncio event loop spin on dangling
    # tasks/connections from the responses API path. Flush first.
    sys.stdout.flush()
    sys.stderr.flush()
    os._exit(0 if (session is None or session.finished) else 1)

src/seclab_taskflow_agent/runner.py:828

  • Calling os._exit(...) inside run_main is a hard process termination that bypasses normal exception propagation, asyncio.run teardown, and any caller cleanup. Since run_main is re-exported for backwards compatibility and can be imported as a library API, this is a breaking behavior change; prefer returning normally and letting the CLI layer decide exit codes (e.g., by raising typer.Exit in cli.py or returning a status).
    # Force-exit to prevent asyncio event loop spin on dangling
    # tasks/connections from the responses API path. Flush first.
    sys.stdout.flush()
    sys.stderr.flush()
    os._exit(0 if (session is None or session.finished) else 1)

src/seclab_taskflow_agent/runner.py:510

  • Only the primary TaskAgent (agent0) is closed here, but the handoff agents are created via TaskAgent(...).agent earlier, which discards the wrapper and leaves their underlying AsyncOpenAI / httpx pools without an explicit close. If the goal is to eliminate stuck sockets/CPU spin, consider retaining the TaskAgent wrappers for handoffs too and closing all of them in this finally block.
    finally:
        # Close the AsyncOpenAI client to release httpx connection pool.
        # Dead CLOSE_WAIT sockets in the pool cause kqueue CPU spin if left open.
        watchdog_ping()
        if agent0 is not None:
            await agent0.close()
  • Files reviewed: 2/2 changed files
  • Comments generated: 2

Comment on lines 410 to 418
system_prompt = mcp_system_prompt(
primary_personality.personality,
primary_personality.task,
server_prompts=server_prompts,
important_guidelines=important_guidelines,
)
agent0 = None
agent0 = TaskAgent(
name=primary_name,
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agent0 is initialized to None only after several operations that can raise (e.g., building prompts / handoff agent setup). If an exception occurs before line 416 executes, the finally block later will reference an unbound local agent0 when attempting to close it, raising UnboundLocalError and masking the original failure. Define agent0: TaskAgent | None = None before the outer try: (or otherwise ensure it is always bound) before entering code that may throw.

This issue also appears in the following locations of the same file:

  • line 505
  • line 824
  • line 824

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +99
def start_watchdog(timeout: int = WATCHDOG_IDLE_TIMEOUT) -> None:
"""Start the watchdog thread (idempotent, daemon thread)."""
t = threading.Thread(target=_watchdog_thread, args=(timeout,), daemon=True)
t.start()
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_watchdog is documented as idempotent, but the current implementation always spawns a new daemon thread on every call. Also, _watchdog_last_activity is initialized at module import time and start_watchdog() does not reset it, so if this module is imported and start_watchdog() is invoked later than timeout seconds, the watchdog can force-exit almost immediately. Consider (1) tracking a module-level started flag/thread to make this truly idempotent, and (2) calling watchdog_ping() (or otherwise resetting the timestamp) inside start_watchdog() before starting the thread.

Copilot uses AI. Check for mistakes.
@anticomputer
Copy link
Copy Markdown
Contributor Author

What this PR fixes

When using the responses API, the agent process can hang indefinitely in several ways. This PR adds layered defenses against all observed variants.

Bug 1: Dead TCP connections (CLOSE_WAIT spin)

When the API server drops a streaming connection, the TCP socket enters CLOSE_WAIT state. The asyncio event loop busy-polls the dead fd via kqueue, pinning the CPU at 40-50% per process. The streaming iterator blocks forever on an internal event queue because the feeder task is stuck on the dead socket.

Fix: Add httpx.Timeout(connect=10, read=300, write=300, pool=60) to the AsyncOpenAI client as a first line of defense against dead connections at the TCP level.

Bug 2: Stream idle hang

Even with httpx timeouts, the streaming generator can stall if the connection stays alive but the server stops producing events. The existing retry loop only catches exceptions, not silent stalls.

Fix: Wrap each anext() call in asyncio.wait_for with a 30-minute STREAM_IDLE_TIMEOUT. On timeout, raise APITimeoutError to feed into the existing retry loop (MAX_API_RETRY=5). Also call aclose() on the stream in a finally block to release resources.

Bug 3: Asyncio event loop spin on process exit

After run_main completes (success or failure), asyncio.run() attempts to cancel remaining tasks and close the event loop. Dangling connections and tasks from the responses API path cause this cleanup phase to spin indefinitely at high CPU.

Fix: Call os._exit() at the end of run_main with the correct exit code, before returning to asyncio.run()'s cleanup. All session state, logs, and checkpoints are persisted before this point. stdout/stderr are flushed before exiting.

Bug 4: Hangs inside deploy_task_agents

The os._exit at the end of run_main only fires after all tasks complete. For multi-task workflows like audit_issue_local_iter, if any single deploy_task_agents call hangs (in streaming, MCP cleanup, or anywhere else), run_main never reaches os._exit and the process spins indefinitely.

Fix: A watchdog daemon thread that monitors last-activity timestamps from outside asyncio. Pings are placed in the streaming loop, tool call hooks, and MCP cleanup entry. If no activity for WATCHDOG_IDLE_TIMEOUT (default 35 min, configurable via env var), the thread calls os._exit(2). The timeout is set 5 minutes above STREAM_IDLE_TIMEOUT so the in-loop timeout gets first shot at a graceful retry.

Design

These are defense-in-depth layers, not redundant fixes:

  • httpx.Timeout: catches dead sockets at the transport level
  • STREAM_IDLE_TIMEOUT: catches stalled streams at the application level, enables retry
  • os._exit in run_main: prevents post-completion asyncio cleanup spin
  • Watchdog thread: catches any hang variant from outside the event loop

Each layer addresses a failure mode the others cannot. If the upstream SDK fixes its asyncio cleanup, all four can be removed without functional impact.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants